<?php
declare (strict_types = 1);

namespace app\command;

use app\api\services\DelayOrderService;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use think\console\Command;
use think\console\Input;
use think\console\Output;

class RabbitMQOrderDelayCallback extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('rabbitmq:order:delay:callback')
            ->setDescription('the rabbitmq:order:delay:callback command');
    }

    /**
     * @throws \ErrorException
     * @throws \Exception
     */
    protected function execute(Input $input, Output $output)
    {
        $config     = config('rabbitmq.hosts');
        $connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['username'],
            $config['password'],
            $config['vhost'],
        );
        $channel = $connection->channel();

        $conf = config('rabbitmq.delay_order_queue');

        //指定交换机类型为direct
        /**
         * Declares exchange
         *
         * @param string $exchange
         * @param string $type
         * @param bool $passive
         * @param bool $durable
         * @param bool $auto_delete
         * @param bool $internal
         * @param bool $nowait
         * @return mixed|null
         * https://help.aliyun.com/zh/apsaramq-for-rabbitmq/developer-reference/error-codes-returned-when-amqp-methods-are-called
         * channel.exchangeDeclare("${exchangeName}", "只能取值x-delayed-message", true, false, arguments);
         * ExchangeTypeNotSupport[x-delayed-message]
         */
        $channel->exchange_declare($conf['exchange_name'],
            'x-delayed-message', true, false, false, false, false
            , new AMQPTable([
                'x-delayed-type' => AMQPExchangeType::DIRECT,
            ])
        );
        /**
         * Declares queue, creates if needed
         *
         * @param string $queue
         * @param bool $passive
         * @param bool $durable
         * @param bool $exclusive
         * @param bool $auto_delete
         * @param bool $nowait
         * @param null $arguments
         * @param null $ticket
         * @return mixed|null
         */
        $args = new AMQPTable(['x-delayed-type' => 'direct']);

        $channel->queue_declare($conf['queue_name'],
            false,
            true,
            false,
            false,
            false,
            $args
        );
        //将队列名与交换器名进行绑定，并指定routing_key
        $channel->queue_bind(
            $conf['queue_name'],
            $conf['exchange_name'],
            $conf['route_key']
        );

        // 消费者处理消息的回调函数
        $callback = function ($msg) {
            echo ' [x] Received ', date('Y-m-d H:i:s') . $msg->body, "\n";
            // 处理接收到的消息
            $data = $msg->body ? json_decode($msg->body, true) : [];
            DelayOrderService::getInstance()->order($data);
            //确认消息已被消费，从生产队列中移除
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        //设置消费成功后才能继续进行下一个消费
        $channel->basic_qos(0, 1, false);
        // 设置消费者来消费延迟队列的消息
        $channel->basic_consume($conf['queue_name'], '', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
        // 指令输出
        $output->writeln('rabbitmq:order:delay:callback');
    }
}
